package com.leilei.source.scoket;


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lei
 * @version 1.0
 * @date 2021/3/8 22:08
 * @desc flink 从socket中读取数据
 */
public class FlinkSourceBySocket {
    public static void main(String[] args) throws Exception {
        //准备环境 env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        //获取数据 source
        //需要在对应服务器上执行 nc -lk 9999命令
        DataStream<String> source = env.socketTextStream("xxxxxxxxx", 7789);
        //数据处理
        //数据收集 sink
        source.print();
        //程序执行 execute
        env.execute("flink-source-socket");

    }
}


